<?php
header('Content-Type:text/html;charset=utf8;');
include '../conn.php';
$params = array(
    'exchangeName' => 'delay_exchange',
    'queueName' => 'delay_queue',
    'routeKey' => 'delay_route',
);

    $conn = new AMQPConnection($conn_args);
    $conn->connect();
    if (!$conn->isConnected()) die("Cannot connect to the broker!\n");
    $channel = new AMQPChannel($conn);
    if (!$channel->isConnected()) die('Connection through channel failed');

    $exchange = new AMQPExchange($channel);
    $exchange->setFlags(AMQP_DURABLE);
    $exchange->setName($params['exchangeName']);
    $exchange->setType(AMQP_EX_TYPE_DIRECT); //direct类型
    $exchange->declareExchange();

    //$channel->startTransaction();

    $queue = new AMQPQueue($channel);
    $queue->setName($params['queueName']);
    $queue->setFlags(AMQP_DURABLE);
    $queue->declareQueue();

    //绑定
    $queue->bind($params['exchangeName'], $params['routeKey']);

function callback(AMQPEnvelope $message,$queue) {
    if ($message) {
        $body = $message->getBody();
        echo $body . '-----------now time:'.date("Y-m-d H:i:s",time()).PHP_EOL;
        $queue->ack($message->getDeliveryTag());
    } else {
        echo 'no message' . PHP_EOL;
    }
}

$queue->consume('callback');


$start = time();
while(true)
{
    $message = $queue->get();
    if(!empty($message))
    {
        echo $message->getBody();
        $queue->ack($message->getDeliveryTag());    //应答，代表该消息已经消费
        $end = time();
        echo '<br>' . ($end - $start);
        exit();
    }

}